Java Concurrent - AQS & Worker

1. Overview

之前提到过, ThreadPoolExecutor的内部类Worker维护了一个非常基本的锁结构. 方法不多, 可以由名知义:

- lock
- tryLock
- unlock
- isLocked

Worker本身继承了AbstractQueuedSynchronizer(AQS)来实现上述功能的. 事实上, AQS是Java中数个并发工具的基础, 功能也比较复杂. 我们先从这些并发工具中最简单的Worker开始, 之后逐步递进:

- ThreadPoolExecutor#Worker
- CountDownLatch
- Semaphore
- FutureTask (JDK 1.6)
- ReentrantLock
- ConditionObject
- ReadWriteReentrantLock
- AQS总结

2. 一个最简单的独占锁

我们可以基于CAS实现一种锁. 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
* @author shibinfei
*
*/
public class CASLock {

/*
* 锁状态: false 表示当前没有被占用, true表示被占用
*/
private AtomicBoolean lockState = new AtomicBoolean(false);

/*
* 当前持有锁的线程
*/
private Thread lockHolder;

/**
* 尝试获取锁, 如果获取成功返回true, 获取失败立即返回false
* 可重入:
* @return true if success
*/
public boolean tryLock() {
if (lockState.get() == false) {
if (lockState.compareAndSet(false, true)) {
lockHolder = Thread.currentThread();
return true;
}
} else if (lockHolder == Thread.currentThread()) {
// 同一个线程重入
return true;
}
return false;
}

/**
* 解除当前线程锁定
*/
public void unlock() {
if (Thread.currentThread() != lockHolder) {
throw new IllegalMonitorStateException();
}

lockHolder = null;
lockState.set(false);
}

}
  • 类比我们的布尔变量lockState, AQS中使用的是一个整形字段 - state. 在互斥(即同时只能由一个线程持有锁)的场景下, state可能的取值只有0 / 1两种, 分别对应false, true.
  • tryLock只是简单的尝试获取锁, 而在实际的lock方法中, 等待获取锁的线程都是被阻塞住的. 而这些线程的引用, 被维护于一个队列结构中.

3. A Thread-Safe Queue

AQS中另一个主体是一个基于双向链表的队列结构. 不考虑AQS逻辑, 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* A queue based on doubly linked list and compare-and-set pattern
*
* @author shibinfei
*
*/
public class ThreadSafeQueue {

private volatile AtomicReference<Node> head;

private volatile AtomicReference<Node> tail;

public ThreadSafeQueue() {
Node initialNode = new Node();
head.set(initialNode);
tail.set(initialNode);
}

public void enqueue(Node node) {
Node tailNode = tail.get();
for (;;) {
node.prev = tailNode;
if (tail.compareAndSet(tailNode, node)) {
tailNode.next = node;
return;
}
}
}

static class Node {
volatile Node prev;
volatile Node next;
}

}

初始状态下, head, tail指向同一个节点. 如果没有出队列操作, 只是修改tail引用的节点. 观察enqueue方法, 思路和Atomic工具中的getAndSet思路是一致的. 示例中的类和变量在AQS中都可以找到对应.

4. Worker

AQS是一个抽象类. 其他工具都是以继承的方式覆盖AQS的几个钩子方法.

4.1 tryLock

Worker#tryLock和我们的CASLock的基本思想是一致的, 即通过一个原子变量来表示当前锁的状态, 只不过CASLock中使用的是true/false, 而Worker中使用的是1/0, 维持这个状态的是继承于AQS的state字段.

从下面的流程图可以看出来整体逻辑非常简单. 从这里开始, 稍微值得关注是一个问题是哪些方法是Worker重载的, 而哪些是继承的.

当成功拿到锁之后, 会将exclusiveOwnerThread设为当前线程, 根据名称可以看出来, 仅当独占时才会用到此变量.

4.2 isLocked

同样是基于state的简单逻辑: 即AQS#getState() == 0.

4.3 lock

lock的方法比较复杂. 如图:

  • 进入方法时, 首先进行了一次tryAcquire, 如果成功那真是最好的结局. 否则开始”等待”.
  • 等待指的就是当前线程被抽象为一个节点, 进入了队列尾端(See addWaiter) . 就开始”等待”, 随即进入AQS#acquireQueued逻辑.
  • acquireQueued内, 线程进入了一个循环体系中.
    • 仅当满足 当前节点前面节点为头结点 并且 tryAcquire成功 时才能跳出循环. 此时为成功获取锁.
    • 不满足上述条件线程, 会被挂起(park). 如果一个线程此时无法获取锁, 注意它的前置节点waitStatus初始为0, shouldParkAfterFailedAcquire方法中会将其置为Node.SIGNAL. 而在下次循环中, 此线程才被真正挂起.

看到这里, 如果对于下面几个问题有疑问, 先保留.

  • waitStatus是干什么的?
  • 线程在parkAndCheckInterrupt中, 调用的是Thread.interrupted(). 如我们所知, 这个方法会清楚并返回线程的中断状态. 而acquireQueued中如果曾经被中断过, 就会把中断状态返回给acquire, acquire中调用selfInterrupt, 重新将当前线程中断状态设置为`true
  • 何时执行到acquireQueued内部final块调用的cancelAcquire?. 在acquireQueue中的循环, 只有一个出口即满足p == head && tryAcquire(arg). 而此时fail将一定为false, 所以看似一定不会cancelAcquire

Worker看似是一个很典型的锁. 但是注意的是它的构造方法有这么一句

1
setState(-1); // inhibit interrupts until runWorker

初始情况state == -1, 所以compareAndSet(0, 1)是永远无法成功的, 后面对此进行说明. 这里先假设, 初始状态state == 0. 基于一个简单的示例说明数据变化

1
2
3
4
5
6
7
8
9
Worker worker = new Worker();
worker.lock();

new Thread(() -> {
worker.lock();
}).start();

Thread.sleep(1000);
worker.unlock();

这个过程的数据变化如下图(Step 3可以留到下节后再回来看):

4.4 unlock

假如线程A获取了锁, 线程B在等待. 当前专题下

  • WorkertryRelease方法是一定会成功的.

// TODO 流程图 + 实例

回顾

虽然前文曾经表示可以暂时将Worker看作一个ReentrantLock, 然后实际上两者行为还是有一些不一致的. 先来看下, ThreadPoolExecutor都有哪里调用了Worker的锁行为.

  1. 构造方法设置state为-1.
  2. runWorker时先unlock
  3. 在循环中, 每个task执行前后lock, unlock
  4. interruptIdleWorkers前后tryLock, unlock

第1, 2点注释也有说明, 1是为了禁止被interrupt, 2是允许interrupt. 而且由于第一点的存在, 如果当做普通锁, 上来就lock()的话, 是会一直阻塞的, 所以前面把初始状态设置为0.

在大部分情况下, Woker这个同步工具并没有涉及到多线程间通信, 它只被线程池中的线程持有. 而当主线程试图interrupt线程池中线程时, 才有多个线程对锁的争用. ThreadPoolExecutor

  • shutdown中会调用interruptIdleWorkers(tryLock成功再interrupt)
  • shutdownNow调用interruptWorkers(对所有线程都interrupt).

这两个方法中一个会判断getState() >= 0, 一个会tryLock(). 所以当getState() == -1时两个操作都无效.

另外, WorkertryRelease方法很粗暴. 如果有ReentrantLock使用经验会发现, 如果某线程不是锁的持有者但又试图unlock, 会抛出IllegalMonitorStateException. 而Worker则不然, A线程lock后B线程也可以unlock.

单独提取Worker中部分同步相关代码, 可以注意下那些方法是Override的, 以及暴露的方法和AQS的对应关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@SuppressWarnings("serial")
public class Worker extends AbstractQueuedSynchronizer {

public Worker() {
super();
setState(-1);
}

@Override
protected boolean isHeldExclusively() {
return getState() != 0;
}

@Override
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

@Override
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() {
acquire(1);
}

public boolean tryLock() {
return tryAcquire(1);
}

public void unlock() {
release(1);
}

public boolean isLocked() {
return isHeldExclusively();
}

}